package com.huan.table

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, Kafka, Schema}

object KafkaPipeline {
  def main(args: Array[String]): Unit = {
    //流处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //表环境
    val tableEnv = StreamTableEnvironment.create(env)
    //创建kafka流
    tableEnv.connect(new Kafka()
      .version("0.11")
      .topic("input")
      .property("zookeeper.conner", "Bigdata:2181")
      .property("bootstrap.servers", "Bigdata:9092")
    ) //kafka输入时的数据
      .withFormat(new Csv())
      .withSchema(new Schema() //字段
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      ).createTemporaryTable("inputTable")


    val inputTableSource = tableEnv.from("inputTable")
    //转换操作
    val resultTable = inputTableSource
      .select('id, 'temperature)
      .filter('id === "sensor_1")

    //聚合操作
    val aggTable = inputTableSource
      .groupBy('id)
      .select('id, 'id.count as 'count)

    //TODO 创建kafka输入表
    tableEnv.connect(new Kafka()
      .version("0.11")
      .topic("output")
      .property("bootstrap.servers", "Bigdata:9092")
      .property("zookeeper.connect", "Bigdata:2181")
    ).withFormat(new Csv())
      .withSchema(new Schema() //字段
        .field("id", DataTypes.STRING())
        .field("temperature", DataTypes.DOUBLE())
      ).createTemporaryTable("outputTable")

    resultTable.insertInto("outputTable")

    env.execute("KafkaPipeline")

  }
}
